package io.kestra.plugin.core.flow;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.ListUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.PositiveOrZero;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.util.List;
import java.util.Optional;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
    title = "Execute a group of tasks for each value in the list.",
    description = """
        You can control how many task groups are executed concurrently by setting the `concurrencyLimit` property. \

        - A `concurrencyLimit` of `0` means no limit — all task groups run in parallel. \

        - A `concurrencyLimit` of `1` means full serialization — only one task group runs at a time, in order. \

        - A `concurrencyLimit` greater than `1` allows up to that number of task groups to run in parallel. \


        Regardless of the `concurrencyLimit` property, the `tasks` will run one after the other — to run those in parallel, wrap them in a [Parallel](https://kestra.io/plugins/core/tasks/flow/io.kestra.plugin.core.flow.parallel) task as shown in the last example below (_see the flow `parallel_tasks_example`_). \


        The `values` can be defined as a JSON string or an array, e.g. a list of string values `["value1", "value2"]` or a list of key-value pairs `[{"key": "value1"}, {"key": "value2"}]`.\s


        Access the current iteration value using `{{ taskrun.value }}` \
        or `{{ parent.taskrun.value }}` when inside a nested child task. \
        The iteration number is available via `{{ taskrun.iteration }}`. \


        If you need to execute more than 2-5 tasks for each value, we recommend triggering a subflow for each value for better performance and modularity. \
        See the [flow best practices documentation](https://kestra.io/docs/best-practices/flows) for more details."""
)
@Plugin(
    examples = {
        @Example(
            full = true,
            title = """
                The `{{ taskrun.value }}` from the `for_each` task is available only to direct child tasks \
                such as the `before_if` and the `if` tasks. To access the taskrun value of the parent task \
                in a nested child task such as the `after_if` task, use `{{ parent.taskrun.value }}`.""",
            code = """
                id: for_loop_example
                namespace: company.team

                tasks:
                  - id: for_each
                    type: io.kestra.plugin.core.flow.ForEach
                    values: ["value 1", "value 2", "value 3"]
                    tasks:
                      - id: before_if
                        type: io.kestra.plugin.core.debug.Return
                        format: "Before if {{ taskrun.value }}"
                      - id: if
                        type: io.kestra.plugin.core.flow.If
                        condition: '{{ taskrun.value == "value 2" }}'
                        then:
                          - id: after_if
                            type: io.kestra.plugin.core.debug.Return
                            format: "After if {{ parent.taskrun.value }}"
                """
        ),
        @Example(
            full = true,
            title = """
                This flow uses YAML-style array for `values`. The task `for_each` iterates over a list of values \
                and executes the `return` child task for each value. The `concurrencyLimit` property is set to 2, \
                so the `return` task will run concurrently for the first two values in the list at first. \
                The `return` task will run for the next two values only after the task runs for the first two values \
                have completed.""",
            code = """
                id: for_each_value
                namespace: company.team

                tasks:
                  - id: for_each
                    type: io.kestra.plugin.core.flow.ForEach
                    values:
                      - value 1
                      - value 2
                      - value 3
                      - value 4
                    concurrencyLimit: 2
                    tasks:
                      - id: return
                        type: io.kestra.plugin.core.debug.Return
                        format: "{{ task.id }} with value {{ taskrun.value }}"
                """
        ),
        @Example(
            full = true,
            title = """
                This example shows how to run tasks in parallel for each value in the list. \
                All child tasks of the `parallel` task will run in parallel. \
                However, due to the `concurrencyLimit` property set to 2, \
                only two `parallel` task groups will run at any given time.""",
            code = """
                id: parallel_tasks_example
                namespace: company.team

                tasks:
                  - id: for_each
                    type: io.kestra.plugin.core.flow.ForEach
                    values: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
                    concurrencyLimit: 2
                    tasks:
                      - id: parallel
                        type: io.kestra.plugin.core.flow.Parallel
                        tasks:
                        - id: log
                          type: io.kestra.plugin.core.log.Log
                          message: Processing {{ parent.taskrun.value }}
                        - id: shell
                          type: io.kestra.plugin.scripts.shell.Commands
                          commands:
                            - sleep {{ parent.taskrun.value }}
                """
        ),
        @Example(
            full = true,
            title = """
                This example demonstrates processing data across nested loops of S3 buckets, years, and months. \
                It generates structured identifiers (e.g., `bucket1_2025_March`) by combining values from each loop level, \
                while accessing parent loop values like years and buckets, which can be useful for partitioned \
                storage paths or time-based datasets. The flow uses dynamic expressions referencing parent context.""",
            code = """
                id: loop_multiple_times
                namespace: company.team

                inputs:
                  - id: s3_buckets
                    type: ARRAY
                    itemType: STRING
                    defaults:
                      - bucket1
                      - bucket2

                  - id: years
                    type: ARRAY
                    itemType: INT
                    defaults:
                      - 2025
                      - 2026

                  - id: months
                    type: ARRAY
                    itemType: STRING
                    defaults:
                      - March
                      - April

                tasks:
                  - id: buckets
                    type: io.kestra.plugin.core.flow.ForEach
                    values: "{{inputs.s3_buckets}}"
                    tasks:
                      - id: year
                        type: io.kestra.plugin.core.flow.ForEach
                        values: "{{inputs.years}}"
                        tasks:
                          - id: month
                            type: io.kestra.plugin.core.flow.ForEach
                            values: "{{inputs.months}}"
                            tasks:
                              - id: full_table_name
                                type: io.kestra.plugin.core.log.Log
                                message: |
                                  Full table name: {{parents[1].taskrun.value }}_{{parent.taskrun.value}}_{{taskrun.value}}
                                  Direct/current loop (months): {{taskrun.value}}
                                  Value of loop one higher up (years): {{parents[0].taskrun.value}}
                                  Further up (table types): {{parents[1].taskrun.value}}
                """
        ),
    }
)
public class ForEach extends Sequential implements FlowableTask<VoidOutput> {
    @NotNull
    @PluginProperty(dynamic = true)
    @Schema(
        title = "The list of values for which Kestra will execute a group of tasks",
        description = "The values can be passed as a string, a list of strings, or a list of objects.",
        oneOf = {String.class, Object[].class}
    )
    private Object values;

    @PositiveOrZero
    @NotNull
    @Builder.Default
    @Schema(
      title = "The number of concurrent task groups for each value in the `values` array",
      description = """
        A `concurrencyLimit` of 0 means no limit — all task groups run in parallel.

        A `concurrencyLimit` of 1 means full serialization — only one task group runs at a time, in order.

        A `concurrencyLimit` greater than 1 allows up to the specified number of task groups to run in parallel.
        """
    )
    @PluginProperty
    private final Integer concurrencyLimit = 1;

    @Override
    public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
        GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.DYNAMIC);

        // ForEach executes task groups concurrently, not the task inside the group concurrently,
        // so the topology should display it as a sequential.
        GraphUtils.sequential(
            subGraph,
            this.getTasks(),
            this.getErrors(),
            this.getFinally(),
            taskRun,
            execution
        );

        return subGraph;
    }

    @Override
    public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
        return FlowableUtils.resolveEachTasks(runContext, parentTaskRun, this.getTasks(), this.values);
    }

    @Override
    public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
        List<ResolvedTask> childTasks = ListUtils.emptyOnNull(this.childTasks(runContext, parentTaskRun)).stream()
            .filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
            .toList();

        if (childTasks.isEmpty()) {
            return Optional.of(State.Type.SUCCESS);
        }

        return FlowableUtils.resolveState(
            execution,
            childTasks,
            FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
            FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
            parentTaskRun,
            runContext,
            this.isAllowFailure(),
            this.isAllowWarning()
        );
    }

    @Override
    public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
        if (this.concurrencyLimit == 1) {
            return FlowableUtils.resolveSequentialNexts(
                execution,
                this.childTasks(runContext, parentTaskRun),
                FlowableUtils.resolveTasks(this.errors, parentTaskRun),
                FlowableUtils.resolveTasks(this._finally, parentTaskRun),
                parentTaskRun
            );
        }

        return FlowableUtils.resolveConcurrentNexts(
            execution,
            FlowableUtils.resolveEachTasks(runContext, parentTaskRun, this.getTasks(), this.values),
            FlowableUtils.resolveTasks(this.errors, parentTaskRun),
            FlowableUtils.resolveTasks(this._finally, parentTaskRun),
            parentTaskRun,
            this.concurrencyLimit
        );
    }
}
